Skip to content

Commit

Permalink
Fix spammer state bugs (#37)
Browse files Browse the repository at this point in the history
* Fix shutdown of spammer if getting ledger state fails

* Recover if sending spam transactions failed

* Fix contextcheck linter warnings

* Fix revive linter warnings
  • Loading branch information
muXxer authored Aug 26, 2022
1 parent e13ff5f commit d83cb2b
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 28 deletions.
23 changes: 23 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ linters-settings:
initialisms: ["ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS", "SIP", "RTP", "AMQP", "DB", "TS"]

linters:
# Disable all linters.
disable-all: true
# Enable specific linter
enable:
- deadcode
- errcheck
Expand Down Expand Up @@ -107,3 +110,23 @@ issues:
# Set to 0 to disable.
# Default: 3
max-same-issues: 0
#exclude:
# - 'Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*print(f|ln)?|os\.(Un)?Setenv). is not checked' # errcheck
# - "err113: do not define dynamic errors, use wrapped static errors instead:" # goerr113
# - "type name will be used as [0-9A-Za-z_.]+ by other packages, and that stutters; consider calling this" # golint
# - "Potential file inclusion via variable" # gosec
# - "G404: Use of weak random number generator" # gosec
# - "Subprocess launch(ed with variable|ing should be audited)" # gosec
# - "Use of unsafe calls should be audited" # gosec
# - "G108: Profiling endpoint is automatically exposed on /debug/pprof" # gosec
# - "(Expect directory permissions to be 0750 or less|Expect file permissions to be 0600 or less)" # gosec
# - "G101: Potential hardcoded credentials" # gosec
# - "(G104|G307)" # gosec Duplicated errcheck checks.
# - "`[0-9A-Za-z_.]+` - `[0-9A-Za-z_.]+` always receives `[0-9A-Za-z_.]+`" # unparam
# - "should have comment .*or be unexported" # revive
# - "exported: comment on exported" # revive
# - "package-comments: package comment should be of the form" # revive
# - "blank-imports" # revive
# - "var-naming: don't use leading k in Go names;" #revive
# - 'shadow: declaration of "err"' # govet

6 changes: 5 additions & 1 deletion core/spammer/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func run() error {

CoreComponent.LogInfo("Starting API server...")

//nolint:contextcheck // false positive
_ = spammer.NewServer(deps.Spammer, e.Group(""))

go func() {
Expand All @@ -263,10 +264,13 @@ func run() error {
}

shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCtxCancel()

//nolint:contextcheck // false positive
if err := e.Shutdown(shutdownCtx); err != nil {
CoreComponent.LogWarn(err)
}
shutdownCtxCancel()

CoreComponent.LogInfo("Stopping API ... done")
}, daemon.PriorityStopSpammerAPI); err != nil {
CoreComponent.LogPanicf("failed to start worker: %s", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/spammer/cpu_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (c *CPUUsageUpdater) Run(ctx context.Context) {
return
}

//nolint:contextcheck // false positive
cpuUsagePSutil, err := cpu.Percent(c.sampleTime, false)
c.Lock()
if err != nil {
Expand Down
64 changes: 38 additions & 26 deletions pkg/spammer/spammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

const (
IndexerQueryMaxResults = 1000
IndexerQueryTimeout = 30 * time.Second
IndexerQueryTimeout = 15 * time.Second
)

type outputState byte
Expand Down Expand Up @@ -234,7 +234,7 @@ func New(
workersCount = runtime.NumCPU() - 1
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), IndexerQueryTimeout)
defer cancel()

var err error
Expand Down Expand Up @@ -563,8 +563,8 @@ func (s *Spammer) startSpammerWorkers(valueSpamEnabled bool, bpsRateLimit float6
spammerWorkerCount = 1
}

var rateLimitChannel chan struct{} = nil
var rateLimitAbortSignal chan struct{} = nil
var rateLimitChannel chan struct{}
var rateLimitAbortSignal chan struct{}
currentProcessID := s.processID.Load()

if bpsRateLimit != 0.0 {
Expand Down Expand Up @@ -797,7 +797,7 @@ func (s *Spammer) Start(valueSpamEnabled *bool, bpsRateLimit *float64, cpuMaxUsa
workersCountCfg = 1
}

if err := s.getCurrentSpammerLedgerState(); err != nil {
if err := s.getCurrentSpammerLedgerState(context.Background()); err != nil {
return err
}

Expand Down Expand Up @@ -955,20 +955,12 @@ func (s *Spammer) ApplyNewLedgerUpdate(ctx context.Context, msIndex iotago.Miles

if conflicting {
// there was a conflict in the chain
// forget all known outputs
s.accountSender.ResetOutputs()
s.accountReceiver.ResetOutputs()

// recreate the pending transactions map
s.pendingTransactionsMap = make(map[iotago.BlockID]*pendingTransaction)
s.resetSpammerState()

// wait until the indexer got updated
if err := s.waitForIndexerUpdate(ctx, msIndex); err != nil {
return err
}

// reset the state if there was a conflict
s.outputState = stateBasicOutputCreate
} else {
// we only allow the spammer to create new transactions if there was no conflict in the last milestone.
s.currentLedgerMilestoneIndex = s.ledgerMilestoneIndex.Load()
Expand All @@ -980,22 +972,38 @@ func (s *Spammer) ApplyNewLedgerUpdate(ctx context.Context, msIndex iotago.Miles
// but there are no new outputs created by the spammer.
// in this case we query the indexer again to get the latest state.
if s.accountSender.Empty() && s.accountReceiver.Empty() && s.isValueSpamEnabled {
if err := s.getCurrentSpammerLedgerState(); err != nil {
return err
if err := s.getCurrentSpammerLedgerState(ctx); err != nil {
// there was an error getting the current ledger state
s.resetSpammerState()

s.LogWarnf("failed to get current spammer ledger state: %s", err.Error())
}
}

return nil
}

// resetSpammerState resets the spammer state in case of a conflict.
func (s *Spammer) resetSpammerState() {
// forget all known outputs
s.accountSender.ResetOutputs()
s.accountReceiver.ResetOutputs()

// recreate the pending transactions map
s.pendingTransactionsMap = make(map[iotago.BlockID]*pendingTransaction)

// reset the state if there was a conflict
s.outputState = stateBasicOutputCreate
}

// waitForIndexerUpdate waits until the indexer got updated to the expected milestone index.
func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.MilestoneIndex) error {

if s.indexer == nil {
return nodeclient.ErrIndexerPluginNotAvailable
}

ctxWaitForUpdate, cancelWaitForUpdate := context.WithTimeout(ctx, 10*time.Second)
ctxWaitForUpdate, cancelWaitForUpdate := context.WithTimeout(ctx, IndexerQueryTimeout)
defer cancelWaitForUpdate()

for ctxWaitForUpdate.Err() == nil {
Expand Down Expand Up @@ -1023,7 +1031,7 @@ func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.Miles
return ctxWaitForUpdate.Err()
}

func (s *Spammer) getCurrentSpammerLedgerState() error {
func (s *Spammer) getCurrentSpammerLedgerState(ctx context.Context) error {

if s.accountSender == nil || s.accountReceiver == nil {
return nil
Expand All @@ -1035,25 +1043,25 @@ func (s *Spammer) getCurrentSpammerLedgerState() error {

ts := time.Now()

ctx, cancel := context.WithTimeout(context.Background(), IndexerQueryTimeout)
defer cancel()
ctxQuery, cancelQuery := context.WithTimeout(ctx, IndexerQueryTimeout)
defer cancelQuery()

// only query basic outputs with native tokens if we want to melt them
allowNativeTokens := s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken && s.valueSpamMeltNativeToken

// get all known outputs from the indexer (sender)
if err := s.accountSender.QueryOutputsFromIndexer(ctx, s.indexer, allowNativeTokens, true, s.valueSpamCreateAlias, s.valueSpamCreateAlias, s.valueSpamCreateNFT, IndexerQueryMaxResults); err != nil {
if err := s.accountSender.QueryOutputsFromIndexer(ctxQuery, s.indexer, allowNativeTokens, true, s.valueSpamCreateAlias, s.valueSpamCreateAlias, s.valueSpamCreateNFT, IndexerQueryMaxResults); err != nil {
return err
}

receiversAliasOutputsUsed := s.valueSpamCreateAlias && s.valueSpamDestroyAlias && ((!s.valueSpamCreateFoundry || s.valueSpamDestroyFoundry) && (!s.valueSpamMintNativeToken || s.valueSpamMeltNativeToken))

// get all known outputs from the indexer (receiver)
if err := s.accountReceiver.QueryOutputsFromIndexer(ctx, s.indexer, allowNativeTokens, s.valueSpamCollectBasicOutput, receiversAliasOutputsUsed, receiversAliasOutputsUsed, s.valueSpamDestroyNFT, IndexerQueryMaxResults); err != nil {
if err := s.accountReceiver.QueryOutputsFromIndexer(ctxQuery, s.indexer, allowNativeTokens, s.valueSpamCollectBasicOutput, receiversAliasOutputsUsed, receiversAliasOutputsUsed, s.valueSpamDestroyNFT, IndexerQueryMaxResults); err != nil {
return err
}

s.LogDebugf(`getCurrentSpammerLedgerState finised, took: %v
s.LogDebugf(`getCurrentSpammerLedgerState finished, took: %v
outputs sender: basic: %d, alias: %d, foundry: %d, nft: %d
outputs receiver: basic: %d, alias: %d, foundry: %d, nft: %d`, time.Since(ts).Truncate(time.Millisecond),
s.accountSender.BasicOutputsCount(), s.accountSender.AliasOutputsCount(), s.accountSender.FoundryOutputsCount(), s.accountSender.NFTOutputsCount(),
Expand Down Expand Up @@ -1223,7 +1231,7 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB
senderAddress := spamBuilder.accountSender.Address()

// add all inputs
var remainder int64 = 0
var remainder int64
consumedInputIDs := iotago.OutputIDs{}
for _, input := range spamBuilder.consumedInputs {
remainder += int64(input.Output().Deposit())
Expand Down Expand Up @@ -1313,7 +1321,7 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB
}

// add all outputs and calculate the remainder
var remainderOutputIndex uint16 = 0
var remainderOutputIndex uint16
for i, outputWithOwnership := range spamBuilder.createdOutputs {
output := outputWithOwnership.Output

Expand Down Expand Up @@ -1404,6 +1412,10 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB

blockID, err := s.sendBlockFunc(ctx, block)
if err != nil {
// there was an error during sending a transaction
// it is high likely that something is non-solid or below max depth
s.resetSpammerState()

return nil, nil, fmt.Errorf("send transaction block failed, error: %w", err)
}

Expand All @@ -1423,7 +1435,7 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB

createdOutputs := make([]UTXOInterface, 0)

var outputIndex uint16 = 0
var outputIndex uint16
for _, outputWithOwnership := range spamBuilder.createdOutputs {
output := outputWithOwnership.Output
if output.Type() == iotago.OutputAlias {
Expand Down
5 changes: 4 additions & 1 deletion plugins/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,14 @@ func run() error {
Plugin.LogInfo("Stopping Prometheus exporter ...")

shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCtxCancel()

//nolint:contextcheck // false positive
err := deps.PrometheusEcho.Shutdown(shutdownCtx)
if err != nil {
Plugin.LogWarn(err)
}
shutdownCtxCancel()

Plugin.LogInfo("Stopping Prometheus exporter ... done")
}, daemon.PriorityStopPrometheus)
}
Expand Down

0 comments on commit d83cb2b

Please sign in to comment.